Spark 大数据中文分词统计(二) Java语言实现分词统计

您所在的位置:网站首页 大数据 分词 Spark 大数据中文分词统计(二) Java语言实现分词统计

Spark 大数据中文分词统计(二) Java语言实现分词统计

2024-07-09 14:26| 来源: 网络整理| 查看: 265

           上一篇文章中完成了Windows环境下Spark开发环境的搭建,这一篇来谈一下使用Java语言,基于纯Java

语言、使用MapReduce模式以及Spark框架进行中文分词统计的编程实践。

           进行中文处理,中文分词是首先要考虑的。这里选用了IKAnalyzer,因为原来做论文时用过,接口简单,

使用方便,而且开源,也很好设定。

           下载地址为:http://git.oschina.net/wltea/IK-Analyzer-2012FF

           当然也可以采用其他的中文分词组件。

           在Scala IDE上创建好的工程结构如下:

            程序运行的结果如下:

         我们可以看到,唐诗宋词中,出现最多的两字词有 何处,相思,明月,春风,无人,天涯,流水,江南等等。可见,千百年后,

穿越时空而触动我们心灵的,没有一个是当时的人们孜孜以求的。所以,我们今天念念不忘的身外之物,耿耿于怀的内心遗憾,其实

都不过是暂时的虚幻,随风而逝,不留痕迹,又何必太在意呢?

          工程源码已经上传到CSDN:http://download.csdn.net/detail/yangdanbo1975/9602164

           当然,要运行工程文件的话,不要忘记了添加必须的类库:(如果你按上一篇的步骤完成安装的话,所有的库本地都有了)

          程序中,总共采用Pure Java,MapReduce和Spark 框架(Java语言)三种方式实现了对唐诗三百首,宋词三百首,论语,孟子,老子,庄子

的中文分词统计,可以选词长,也可选TopN,当然也可以通过文件浏览选择其他的文件来源:

         程序的主运行文件是WordCounter,右键单击可以Java Application运行。

         第一种方法是使用Pure Java语言实现中文分词的统计。主要的方法是WordCounter中的 pureJavaWordCount() :

         private void pureJavaWordCount() {                   // 未选择文档,使用下拉框预设文档                  Map wordCount = null;                  if (docField.getText().isEmpty()) {                         wordCount = getWordCount(docBox.getSelectedItem().toString(),                                false, lengthsBox.getSelectedIndex());                  } else {                         wordCount = getWordCount(docField.getText(), true,                                lengthsBox.getSelectedIndex());                  }                  // 显示分词结果                  if (wordCount == null) {                        JOptionPane.showMessageDialog(null, "分词统计结果为空!\n请检查程序或重新选择文档!");                  } else {                        if (rdoTop50.isSelected()) {                                showWordCount(wordCount, 50);                        } else if (rdoTop100.isSelected()) {                                showWordCount(wordCount, 100);                        } else if (rdoTop500.isSelected()) {                                showWordCount(wordCount, 500);                        } else if (rdoTop1000.isSelected()) {                                 showWordCount(wordCount, 1000);                        } else if (rdoAll.isSelected()) {                                 showWordCount(wordCount, 0);                  }              }          }

         代码很简单,唯一值得一提的是,参照网络上的例子,写了一泛型的SortableMap,实现了

按key和value升序或降序排序的功能,使最终输出的分词统计结果可以按词频排序:

         SortableMap words = new SortableMap();

         Map wordCount = words.sortMapByValue(false);

         第二种方法是使用MapReduce模式,进行中文分词统计及按词频排序输出。

         主要的Class是HadoopWordCount,和网上的大多数例子是一样的,不同之处在于在TokenizerMapper类的

Map方法中加入了中文分词的逻辑:

         public void map(Object key, Text value, Context context)                    throws IOException, InterruptedException {                String line = value.toString().toLowerCase(); // 全部转为小写字母                /*                 * line = line.replaceAll(pattern, " "); // 将非0-9, a-z, A-Z的字符替换为空格                 * StringTokenizer itr = new StringTokenizer(line); while                 * (itr.hasMoreTokens()) { word.set(itr.nextToken());                 * context.write(word, one); }                 */

               try {                        InputStream is = new ByteArrayInputStream(                                   line.getBytes("UTF-8"));                        IKSegmenter seg = new IKSegmenter(new InputStreamReader(is), false);

                       Lexeme lex = seg.next();

                       while (lex != null) {                               String text = lex.getLexemeText();                               word.set(text);                               context.write(word, one);                               // output.collect(word, one);                                    lex = seg.next();                        }

                 } catch (UnsupportedEncodingException e) {                        // TODO Auto-generated catch block                        e.printStackTrace();                  } catch (IOException e) {                         // TODO Auto-generated catch block                         e.printStackTrace();                   }             }      }

     在IntSumReducer类的reduce方法的最后,把分词统计的结果保存到全局变量中:

     //保存结果       totalWords.put(key.toString(), Integer.valueOf(sum));

     因为当从WordCounter的Java GUI界面调用HadoopWordCount功能时,需要使用全局变量返回结果。

     第一个MapReduce job完成后,输出结果在C:\\hadoop目录中。

     最后就是新增了一个IntWritableDecreasingComparator,在Map Reduce的第一个Job完成之后,再进行一个

sortJob,以前面的C:\\hadoop作为input,先用hadoop库提供的InverseMapper交换Map中的key和value,然后在

sortJob中设定:

     sortJob.setSortComparatorClass(IntWritableDecreasingComparator.class);

     实现了按词频排序的结果输出。这是输出的结果是放在C:\\hadoopsort目录下。

     本来我想再增加一个RestoreJob,把key和value在交换回来,写入文件。但是由于对Hadoop不熟悉,而

InverseMapper只能实现key和value交换,并不能实现value和key交换,自己有没有能力重写一个,所以只能放弃了。

      HadoopWordCount有两种运行方式,单独main方法命令行方式运行时,会生成输出文件夹;当从WordCounter

中的入口方法:hadoopWordCount() 中调用的,会输出分词结果到GUI,这时对应的输出文件夹没有输出。

     第三种方法 是使用Spark 框架,完成中文分词统计功能,实现类为SparkWordCount。

     基本方法和网上的大多数例子没有差别,只是增加了一个getSplitWords 的方法,替换掉原来的简单split:

        /**          * 4、将行文本内容拆分为多个单词          * lines调用flatMap这个transformation算子(参数类型是FlatMapFunction接口实现类)          * 返回每一行的每个单词          * 加入了中文分词的功能,调用分词后的list结果          */         JavaRDD words = lines.flatMap(new FlatMapFunction(){             private static final long serialVersionUID = -3243665984299496473L;             @Override             public Iterable call(String line) throws Exception {                 //return Arrays.asList(line.split("\t"));              return getSplitWords(line);             }         });

        再就是增加了一个排序,先交换key和value,再按value排序,最后再交换回来:

        public JavaPairRDD sortByValue(JavaPairRDD wordCount, boolean isAsc){                //added by Dumbbell Yang at 2016-08-03                //加入按词频排序功能                //先把key和value交换,然后按sortByKey,最后再交换回去                JavaPairRDD pairs2 = wordCount.mapToPair(new PairFunction() {                      private static final long serialVersionUID = -7879847028195817508L;                             @Override                             public Tuple2 call(Tuple2 word) throws Exception {                                     return new Tuple2(word._2, word._1);                             }                       });                          //降序                  pairs2 = pairs2.sortByKey(isAsc);                          //再次交换key和value                  wordCount = pairs2.mapToPair(new PairFunction() {                             private static final long serialVersionUID = -7879847028195817509L;                                     @Override                                      public Tuple2 call(Tuple2 word) throws Exception {                                              return new Tuple2(word._2, word._1);                                      }                            });                         return wordCount;      }

     要注意的是,开始的conf中要设置为conf.setMaster("local");设定为local standalone模式启动spark job。

     SparkWordCount同样有两种方式调用,一种是用main方法,在命令行运行,结果会输出到控制台;另外一种

是从WordCounter中的private void sparkJavaWordCount() 方法进入,分词统计结果会显示在GUI界面上。

      上面就是采用Java语言实现的Spark中文分词统计功能。

      下一篇,将会采用Scala语言实现Spark中文分词统计。




【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3